package edu.ucla.cs.rpc.multicast.network;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Set;

import edu.ucla.cs.rpc.multicast.handlers.MessageHandler;
import edu.ucla.cs.rpc.multicast.network.message.Message;
import edu.ucla.cs.rpc.multicast.network.message.Message.MessageType;
import edu.ucla.cs.rpc.multicast.util.Network;

/**
 * MulticastMessageSender registers itself with a MulticastManager. The
 * MulticastManager keeps instances of MulticastMessageSender updated with the
 * current group membership list of Dests. Any messages sent with this class
 * will be received by all members of the multicast group.
 * 
 * @author Chase Covello Philip Russell
 * 
 */
public class MulticastMessageSender {

	/**
	 * This message handler receives receiver join/leave messages sent by the
	 * multicast manager to keep its local representation of the state of the
	 * multicast group updated.
	 * 
	 * @author Chase Covello Philip Russell
	 * 
	 */
	private final class MulticastMessageSenderMessageHandler implements
			MessageHandler {

		/**
		 * Receives join/leave messages and uses them to update the sender's
		 * state.
		 */
		public void receive(Message message) {
			MessageType type = message.getType();

			if (type == MessageType.JOIN_RECEIVERS) {
				synchronized (receivers) {
					receivers.add(message.getSource());
				}
			} else if (type == MessageType.LEAVE_RECEIVERS) {
				synchronized (receivers) {
					receivers.remove(message.getSource());
				}
			}
		}
	}

	private final SocketAddress multicastManager;

	private final long nodeID;

	private final Set<SocketAddress> receivers;

	private final MessageReceiver update;

	/**
	 * Construct a new MulticastMessageSender with the given multicast manager.
	 * 
	 * @param multicastManager
	 *            the multicast manager.
	 * @throws IOException
	 *             in the case of unrecoverable network errors.
	 */
	public MulticastMessageSender(SocketAddress multicastManager)
			throws IOException {
		this.multicastManager = multicastManager;
		receivers = new HashSet<SocketAddress>();

		// start a MessageReceiver to keep group membership list up to date
		update = new MessageReceiver(new MulticastMessageSenderMessageHandler());

		// random 64 bit node ID; should be unique with probability of 2^-32
		nodeID = Network.getRandomLong();

		// join the multicast group
		Message join = new Message(MessageType.JOIN_SENDERS, getSocketAddress());
		join.setSenderID(nodeID);
		join.send(multicastManager);
	}

	/**
	 * Returns the ID of this MulticastMessageSender.
	 * 
	 * @return the node ID.
	 */
	public long getNodeID() {
		return nodeID;
	}

	/**
	 * Returns the update thread's socket address.
	 * 
	 * @return the socket address.
	 */
	public SocketAddress getSocketAddress() {
		return update.getSocketAddress();
	}

	/**
	 * Send a multicast message to all members of the receiver group. This
	 * method guarantees reliable, in-order delivery of individual messages, but
	 * does NOT guarantee total ordering.
	 * 
	 * @param message
	 *            the message to send.
	 * @throws IOException
	 *             in the case of unrecoverable network errors.
	 */
	public void send(Message message) throws IOException {
		synchronized (receivers) {
			for (SocketAddress receiver : receivers) {
				message.send(receiver);
			}
		}
	}

	/**
	 * Shuts down any threads created by this message sender. This method
	 * <b>must</b> be called before disposing of all references to this object;
	 * otherwise, its thread will continue to run until the application is
	 * forcibly terminated.
	 */
	public void shutdown() {
		try {
			// leave the multicast group
			Message leave = new Message(MessageType.LEAVE_SENDERS,
					getSocketAddress());
			leave.setSenderID(nodeID);
			leave.send(multicastManager);
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			update.shutdown();
		}
	}
}
